﻿using Orleans;
using Orleans.Streams;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace iTool.ClusterComponent
{
    [ImplicitStreamSubscription("iToolSimpleStream")]
    public abstract class iToolServiceSubscribeBase : iToolServiceBase
    {
        private StreamSubscriptionHandle<string> streamHandle;
        private string queueNamespace;
        public iToolServiceSubscribeBase(string queueNamespace)
        {
            this.queueNamespace = queueNamespace;
        }
        public override async Task OnActivateAsync()
        {
            IStreamProvider streamProvider = this.GetStreamProvider("iToolSimpleStream");
            string topic = this.GetKeyToString();
            Console.WriteLine("Subscribe topic:{0},queueNamespace:{1}", topic, this.queueNamespace);
            Guid streamId = iUtils.ToGuid(topic);
            var stream = streamProvider.GetStream<string>(streamId, this.queueNamespace);
            streamHandle = await stream.SubscribeAsync(this.OnNextAsync, this.OnErrorAsync);
        }
        public override async Task OnDeactivateAsync()
        {
            try
            {
                if (streamHandle != null)
                    await streamHandle.UnsubscribeAsync();
            }
            catch (Exception ex)
            {
                Console.WriteLine("iToolServiceSubscribeBase:" + ex.Message);
            }
        }
        protected abstract Task OnErrorAsync(Exception ex);
        protected abstract Task OnNextAsync(string message);
        private async Task OnNextAsync(string message, StreamSequenceToken token = null) => await this.OnNextAsync(message);
    }
    public abstract class iToolServiceProducerBase : iToolServiceBase
    {
        private Dictionary<string, object> producerStreamHandler;
        private string queueNamespace;
        private string providerName;
        private string topic;
        public iToolServiceProducerBase(string queueNamespace, string providerName = "iToolSimpleStream")
        {
            this.providerName = providerName;
            this.queueNamespace = queueNamespace;
            this.producerStreamHandler = new Dictionary<string, object>();
        }
        public override async Task OnActivateAsync()
        {
            this.topic = this.GetKeyToString();
            await Task.CompletedTask;
        }

        public async Task SendMessageAsync<T>(T message) => await this.GetProducerQueueHandler<T>(this.topic).SendMessageAsync(message);
        public async Task SendMessageAsync<T>(string topic, T message) => await this.GetProducerQueueHandler<T>(topic).SendMessageAsync(message);
        private ProducerQueueHandler<T> GetProducerQueueHandler<T>(string topic)
        {
            if (!this.producerStreamHandler.ContainsKey(topic))
            {
                Console.WriteLine("Producer topic:{0},queueNamespace:{1}", this.topic, this.queueNamespace);
                this.producerStreamHandler.TryAdd(topic, this.GetProducerQueueHandler<T>(topic, this.queueNamespace, this.providerName));
            }
            return this.producerStreamHandler[topic] as ProducerQueueHandler<T>;
        }
    }


    [ImplicitStreamSubscription("iToolSimpleStream")]
    public abstract class iToolServiceSubscribeBase<TState> : iToolServiceBase<TState>
    {
        private StreamSubscriptionHandle<string> streamHandle;
        private string queueNamespace;
        public iToolServiceSubscribeBase(string queueNamespace)
        {
            this.queueNamespace = queueNamespace;
        }
        public override async Task OnActivateAsync()
        {
            IStreamProvider streamProvider = this.GetStreamProvider("iToolSimpleStream");
            string topic = this.GetKeyToString();
            Console.WriteLine("Subscribe topic:{0},queueNamespace:{1}", topic, this.queueNamespace);
            Guid streamId = iUtils.ToGuid(topic);
            var stream = streamProvider.GetStream<string>(streamId, this.queueNamespace);
            streamHandle = await stream.SubscribeAsync(this.OnNextAsync, this.OnErrorAsync);
        }
        public override async Task OnDeactivateAsync()
        {
            if (streamHandle != null)
                await streamHandle.UnsubscribeAsync();
        }
        protected abstract Task OnErrorAsync(Exception ex);
        protected abstract Task OnNextAsync(string message);
        private async Task OnNextAsync(string message, StreamSequenceToken token = null) => await this.OnNextAsync(message);
    }

    public abstract class iToolServiceProducerBase<TState> : iToolServiceBase<TState>
    {
        private Dictionary<string, object> producerStreamHandler;
        private string queueNamespace;
        private string providerName;
        private string topic;
        public iToolServiceProducerBase(string queueNamespace, string providerName = "iToolSimpleStream")
        {
            this.providerName = providerName;
            this.queueNamespace = queueNamespace;
            this.producerStreamHandler = new Dictionary<string, object>();
        }
        public override async Task OnActivateAsync()
        {
            this.topic = this.GetKeyToString();
            await Task.CompletedTask;
        }
        public async Task SendMessageAsync<T>(T message) => await this.GetProducerQueueHandler<T>(this.topic).SendMessageAsync(message);
        public async Task SendMessageAsync<T>(string topic, T message) => await this.GetProducerQueueHandler<T>(topic).SendMessageAsync(message);
        private ProducerQueueHandler<T> GetProducerQueueHandler<T>(string topic)
        {
            if (!this.producerStreamHandler.ContainsKey(topic))
            {
                Console.WriteLine("Producer topic:{0},queueNamespace:{1}", this.topic, this.queueNamespace);
                this.producerStreamHandler.TryAdd(topic, this.GetProducerQueueHandler<T>(topic, this.queueNamespace, this.providerName));
            }
            return this.producerStreamHandler[topic] as ProducerQueueHandler<T>;
        }
    }


    [ImplicitStreamSubscription("iToolSimpleStream")]
    public abstract class iToolServiceSubscribeBase<TState, TEventBase> : iToolServiceBase<TState, TEventBase>
        where TState : class, new()
        where TEventBase : class
    {
        private StreamSubscriptionHandle<string> streamHandle;
        private string queueNamespace;
        public iToolServiceSubscribeBase(int dueTime, string queueNamespace) : base(dueTime)
        {
            this.queueNamespace = queueNamespace;
        }
        public override async Task OnActivateAsync()
        {
            IStreamProvider streamProvider = this.GetStreamProvider("iToolSimpleStream");
            string topic = this.GetKeyToString();
            Console.WriteLine("Subscribe topic:{0},queueNamespace:{1}", topic, this.queueNamespace);
            Guid streamId = iUtils.ToGuid(topic);
            var stream = streamProvider.GetStream<string>(streamId, this.queueNamespace);
            streamHandle = await stream.SubscribeAsync(this.OnNextAsync, this.OnErrorAsync);
        }
        public override async Task OnDeactivateAsync()
        {
            if (streamHandle != null)
                await streamHandle.UnsubscribeAsync();
        }
        protected abstract Task OnErrorAsync(Exception ex);
        protected abstract Task OnNextAsync(string message);
        private async Task OnNextAsync(string message, StreamSequenceToken token = null) => await this.OnNextAsync(message);
    }

    public abstract class iToolServiceProducerBase<TState, TEventBase> : iToolServiceBase<TState, TEventBase>
        where TState : class, new()
        where TEventBase : class
    {
        private Dictionary<string, object> producerStreamHandler;
        private string queueNamespace;
        private string providerName;
        private string topic;
        public iToolServiceProducerBase(int dueTime, string queueNamespace, string providerName = "iToolSimpleStream") : base(dueTime)
        {
            this.providerName = providerName;
            this.queueNamespace = queueNamespace;
            this.producerStreamHandler = new Dictionary<string, object>();
        }
        public override async Task OnActivateAsync()
        {
            this.topic = this.GetKeyToString();
            await Task.CompletedTask;
            Console.WriteLine("Producer topic:{0},queueNamespace:{1}", this.topic, this.queueNamespace);
        }
        public async Task SendMessageAsync<T>(T message) => await this.GetProducerQueueHandler<T>(this.topic).SendMessageAsync(message);
        public async Task SendMessageAsync<T>(string topic, T message) => await this.GetProducerQueueHandler<T>(topic).SendMessageAsync(message);
        private ProducerQueueHandler<T> GetProducerQueueHandler<T>(string topic)
        {
            if (!this.producerStreamHandler.ContainsKey(topic))
            {
                Console.WriteLine("Subscribe topic:{0},queueNamespace:{1}", topic, this.queueNamespace);
                this.producerStreamHandler.TryAdd(topic, this.GetProducerQueueHandler<T>(topic, this.queueNamespace, this.providerName));
            }
            return this.producerStreamHandler[topic] as ProducerQueueHandler<T>;
        }
    }



    [ImplicitStreamSubscription("iToolSimpleStream")]
    public abstract class iToolServiceStorageSubscribeBase<TState> : iToolServiceStorageBase<TState>
    {
        private StreamSubscriptionHandle<string> streamHandle;
        private string queueNamespace;
        public iToolServiceStorageSubscribeBase(string queueNamespace)
        {
            this.queueNamespace = queueNamespace;
        }
        public override async Task OnActivateAsync()
        {
            IStreamProvider streamProvider = this.GetStreamProvider("iToolSimpleStream");
            string topic = this.GetKeyToString();
            Console.WriteLine("Subscribe topic:{0},queueNamespace:{1}", topic, this.queueNamespace);
            Guid streamId = iUtils.ToGuid(topic);
            var stream = streamProvider.GetStream<string>(streamId, this.queueNamespace);
            streamHandle = await stream.SubscribeAsync(this.OnNextAsync, this.OnErrorAsync);
        }
        public override async Task OnDeactivateAsync()
        {
            if (streamHandle != null)
                await streamHandle.UnsubscribeAsync();
        }
        protected abstract Task OnErrorAsync(Exception ex);
        protected abstract Task OnNextAsync(string message);
        private async Task OnNextAsync(string message, StreamSequenceToken token = null) => await this.OnNextAsync(message);
    }

    public abstract class iToolServiceStorageProducerBase<TState> : iToolServiceStorageBase<TState>
    {
        private Dictionary<string, object> producerStreamHandler;
        private string queueNamespace;
        private string providerName;
        private string topic;
        public iToolServiceStorageProducerBase(string queueNamespace, string providerName = "iToolSimpleStream")
        {
            this.providerName = providerName;
            this.queueNamespace = queueNamespace;
            this.producerStreamHandler = new Dictionary<string, object>();
        }
        public override async Task OnActivateAsync()
        {
            await base.OnActivateAsync();
            this.topic = this.GetKeyToString();
            await Task.CompletedTask;
        }
        public async Task SendMessageAsync<T>(T message) => await this.GetProducerQueueHandler<T>(this.topic).SendMessageAsync(message);
        public async Task SendMessageAsync<T>(string topic, T message) => await this.GetProducerQueueHandler<T>(topic).SendMessageAsync(message);
        private ProducerQueueHandler<T> GetProducerQueueHandler<T>(string topic)
        {
            if (!this.producerStreamHandler.ContainsKey(topic))
            {
                Console.WriteLine("Subscribe topic:{0},queueNamespace:{1}", topic, this.queueNamespace);
                this.producerStreamHandler.TryAdd(topic, this.GetProducerQueueHandler<T>(topic, this.queueNamespace, this.providerName));
            }
            return this.producerStreamHandler[topic] as ProducerQueueHandler<T>;
        }
    }

}
